home *** CD-ROM | disk | FTP | other *** search
/ Personal Computer World 2009 February / PCWFEB09.iso / Software / Resources / Chat & Communication / Digsby build 37 / digsby_setup.exe / lib / util / threads / threadpool.pyo (.txt) < prev    next >
Python Compiled Bytecode  |  2008-10-13  |  8KB  |  275 lines

  1. # Source Generated with Decompyle++
  2. # File: in.pyo (Python 2.5)
  3.  
  4. from __future__ import with_statement
  5. from util.introspect import use_profiler
  6. __all__ = [
  7.     'makeRequests',
  8.     'NoResultsPending',
  9.     'NoWorkersAvailable',
  10.     'ThreadPool',
  11.     'WorkRequest',
  12.     'WorkerThread']
  13. __author__ = 'Christopher Arndt'
  14. __version__ = '1.2.3'
  15. __revision__ = '$Revision: 1.5 $'
  16. __date__ = '$Date: 2006/06/23 12:32:25 $'
  17. __license__ = 'Python license'
  18. import sys
  19. import threading
  20. import Queue
  21. from util.introspect import callany
  22. from bgthread import BackgroundThread
  23.  
  24. def requesthash(request):
  25.     return (request, getattr(request, 'im_self', None))
  26.  
  27.  
  28. class NoResultsPending(Exception):
  29.     pass
  30.  
  31.  
  32. class NoWorkersAvailable(Exception):
  33.     pass
  34.  
  35.  
  36. class WorkerThread(BackgroundThread):
  37.     
  38.     def __init__(self, threadPool, **kwds):
  39.         if 'name' not in kwds:
  40.             kwds['name'] = threading._newname('Wkr%d')
  41.         
  42.         BackgroundThread.__init__(self, **kwds)
  43.         self.setDaemon(1)
  44.         self.workRequestQueue = threadPool.requestsQueue
  45.         self.resultQueue = threadPool.resultsQueue
  46.         self.runningNow = threadPool.runningNow
  47.         self.runningNowLock = threadPool.runningNowLock
  48.         self._dismissed = threading.Event()
  49.         self.start()
  50.  
  51.     
  52.     def run(self):
  53.         self.BeforeRun()
  54.         use_profiler(self, self._run)
  55.         self.AfterRun()
  56.  
  57.     
  58.     def _run(self):
  59.         while not self._dismissed.isSet():
  60.             setattr(self, 'loopcount', getattr(self, 'loopcount', 0) + 1)
  61.             request = self.workRequestQueue.get()
  62.             if self._dismissed.isSet():
  63.                 self.workRequestQueue.put(request)
  64.                 break
  65.             
  66.             
  67.             try:
  68.                 result = request.callable(*request.args, **request.kwds)
  69.             except Exception:
  70.                 e = None
  71.                 if request.verbose:
  72.                     import traceback as traceback
  73.                     import sys as sys
  74.                     print >>sys.stderr, 'threadpool: this error is being passed to exception handler (or being ignored):\n'
  75.                     traceback.print_exc()
  76.                 
  77.                 request.exception = True
  78.                 request.exception_instance = e
  79.                 result = None
  80.  
  81.             if request.exception and request.exc_callback:
  82.                 callany(request.exc_callback, request.exception_instance)
  83.             
  84.             if request.callback:
  85.                 if request.exception:
  86.                     pass
  87.                 if not (request.exc_callback):
  88.                     callany(request.callback, result)
  89.                 
  90.             self.runningNowLock.__enter__()
  91.             
  92.             try:
  93.                 self.runningNow.discard(requesthash(request.callable))
  94.             finally:
  95.                 pass
  96.  
  97.             continue
  98.             self.runningNowLock
  99.  
  100.     
  101.     def dismiss(self):
  102.         self._dismissed.set()
  103.  
  104.  
  105.  
  106. class WorkRequest(object):
  107.     
  108.     def __init__(self, callable, args = None, kwds = None, requestID = None, callback = None, exc_callback = None):
  109.         if requestID is None:
  110.             self.requestID = id(self)
  111.         else:
  112.             
  113.             try:
  114.                 hash(requestID)
  115.             except TypeError:
  116.                 raise TypeError('requestID must be hashable.')
  117.  
  118.             self.requestID = requestID
  119.         self.exception = False
  120.         self.callback = callback
  121.         self.exc_callback = exc_callback
  122.         self.callable = callable
  123.         if not args:
  124.             pass
  125.         self.args = []
  126.         if not kwds:
  127.             pass
  128.         self.kwds = { }
  129.  
  130.  
  131.  
  132. class ThreadPool(object):
  133.     requestsQueue = Queue.Queue()
  134.     resultsQueue = Queue.Queue()
  135.     runningNow = set()
  136.     runningNowLock = threading.RLock()
  137.     workers = []
  138.     
  139.     def __init__(self, num_workers = 0, q_size = 0):
  140.         self.requestsQueue.maxsize = q_size
  141.         self.createWorkers(num_workers)
  142.  
  143.     
  144.     def createWorkers(self, num_workers):
  145.         for i in range(num_workers):
  146.             self.workers.append(WorkerThread(self))
  147.         
  148.  
  149.     
  150.     def joinAll(self):
  151.         for worker in self.workers:
  152.             worker.dismiss()
  153.         
  154.         threaded = threaded
  155.         import threadpool2
  156.         threaded((lambda : pass))()
  157.         for worker in self.workers:
  158.             worker.join()
  159.         
  160.  
  161.     
  162.     def dismissWorkers(self, num_workers):
  163.         for i in range(min(num_workers, len(self.workers))):
  164.             worker = self.workers.pop()
  165.             worker.dismiss()
  166.         
  167.  
  168.     
  169.     def __contains__(self, request_method):
  170.         h = requesthash(request_method)
  171.         self.runningNowLock.__enter__()
  172.         
  173.         try:
  174.             return h in self.runningNow
  175.         finally:
  176.             pass
  177.  
  178.  
  179.     
  180.     def putRequest(self, request, block = True, timeout = 0):
  181.         self.runningNowLock.__enter__()
  182.         
  183.         try:
  184.             self.runningNow.add(requesthash(request.callable))
  185.         finally:
  186.             pass
  187.  
  188.         self.requestsQueue.put(request, block, timeout)
  189.  
  190.     
  191.     def wait(self):
  192.         while None:
  193.             
  194.             try:
  195.                 self.poll(True)
  196.             continue
  197.             except NoResultsPending:
  198.                 break
  199.                 continue
  200.             
  201.  
  202.             return None
  203.  
  204.  
  205.  
  206. def makeRequests(callable, args_list, callback = None, exc_callback = None):
  207.     requests = []
  208.     for item in args_list:
  209.         if isinstance(item, tuple):
  210.             requests.append(WorkRequest(callable, item[0], item[1], callback = callback, exc_callback = exc_callback))
  211.             continue
  212.         requests.append(WorkRequest(callable, [
  213.             item], None, callback = callback, exc_callback = exc_callback))
  214.     
  215.     return requests
  216.  
  217. if __name__ == '__main__':
  218.     import random
  219.     import time
  220.     
  221.     def do_something(data):
  222.         time.sleep(random.randint(1, 5))
  223.         result = round(random.random() * data, 5)
  224.         if result > 3:
  225.             raise RuntimeError('Something extraordinary happened!')
  226.         
  227.         return result
  228.  
  229.     
  230.     def print_result(request, result):
  231.         print '**Result: %s from request #%s' % (result, request.requestID)
  232.  
  233.     
  234.     def handle_exception(request, exc_info):
  235.         print 'Exception occured in request #%s: %s' % (request.requestID, exc_info[1])
  236.  
  237.     data = [ random.randint(1, 10) for i in range(20) ]
  238.     requests = makeRequests(do_something, data, print_result, handle_exception)
  239.     data = [ ((random.randint(1, 10),), { }) for i in range(20) ]
  240.     requests.extend(makeRequests(do_something, data, print_result, handle_exception))
  241.     main = ThreadPool(3)
  242.     for req in requests:
  243.         main.putRequest(req)
  244.         print 'Work request #%s added.' % req.requestID
  245.     
  246.     i = 0
  247.     while None:
  248.         
  249.         try:
  250.             main.poll()
  251.             print 'Main thread working...'
  252.             time.sleep(0.5)
  253.             if i == 10:
  254.                 print 'Adding 3 more worker threads...'
  255.                 main.createWorkers(3)
  256.             
  257.             i += 1
  258.         continue
  259.         except KeyboardInterrupt:
  260.             []
  261.             []
  262.             []
  263.             print 'Interrupted!'
  264.             break
  265.             continue
  266.             except NoResultsPending:
  267.                 print 'All results collected.'
  268.                 break
  269.                 continue
  270.             
  271.         except:
  272.             None<EXCEPTION MATCH>NoResultsPending
  273.             return None
  274.  
  275.